//package com.xdl.apitest.tabletest.udftest
//
//import com.xdl.apitest.SensorReading
//import org.apache.flink.api.java.aggregation.AggregationFunction
//import org.apache.flink.table.functions.AggregationFunction
//import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
//import org.apache.flink.streaming.api.scala._
//import org.apache.flink.streaming.api.windowing.time.Time
//import org.apache.flink.table.api.Table
//import org.apache.flink.table.api.scala._
//import org.apache.flink.types.Row
//
///**
// * Project: FlinkTutorial
// * Package: com.xdl.apitest
// * Version: 1.0
// *
// * Created by guoxiaolong on 2020-08-01-22:30
// */
//object AggregateFunctionTest {
//  def main(args: Array[String]): Unit = {
//    //0.创建流执行环境，读取数据并转换成样例类
//    val env = StreamExecutionEnvironment.getExecutionEnvironment
//    env.setParallelism(1)
//
//    // 1.创建表执行环境
//    val tableEnv = StreamTableEnvironment.create(env)
//
//    // 2. 读取数据转换成流， map 成样例类
//    val filePath: String = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
//    val inputStream: DataStream[String] = env.readTextFile(filePath)
//
//    //map 成样例类型
//    val dataStream: DataStream[SensorReading] = inputStream
//      .map(data => {
//        val dataArray = data.split(",")
//        SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
//      })
//      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
//        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
//      })
//
//    //将流转换成表，直接定义时间字段
//    val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'pt.proctime)
//
//    //先创建一个聚合函数的实例出来
//    val avgTemp = new AvgTemp()
//
//    //Table API 调用
//    val resultTable = sensorTable
//      .groupBy('id)
//      .aggregate( avgTemp('temperature') as 'avgTemp' )
//      .select('id, 'avgTemp)
//
//    resultTable.toRetractStream[Row].print("result")
//
//    env.execute("agg udf test job")
//  }
//
//  //专门定义一个聚合函数的状态类，用来保存聚合状态（sum，count）
//  class AvgTempAcc {
//    var sum: Double = 0.0
//    val count: Int = 0
//  }
//
//  //自定义一个聚合函数
//  class AvgTemp extends AggregationFunction[Double, AvgTempAcc] {
//
//    override def getValue(accumulator: AvgTempAcc): Double = accumulator.sum / accumulator.count
//
//    override def createAccumulator(): AvgTempAcc = new AvgTempAcc()
//
//
//    def accumulate(acc: AvgTempAcc, temp: Double): Unit = {
//      acc.sum += temp
//      acc.count += 1
//
//    }
//  }
//
//}
